{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "9_FdPOuAkS5O"
   },
   "source": [
    "<center>\n",
    "    <p style=\"text-align:center\">\n",
    "        <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n",
    "        <br>\n",
    "        <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
    "        |\n",
    "        <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
    "        |\n",
    "        <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
    "    </p>\n",
    "</center>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "s79namx_lr87"
   },
   "source": [
    "# Langgraph - Parallel Evaluation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "-xjm2ngLkTxb"
   },
   "source": [
    "Parallel Evaluation in LangGraph\n",
    "In this tutorial, we’ll build a parallel execution workflow using LangGraph — ideal for scenarios where multiple evaluations or subtasks can run independently before being aggregated into a final decision.\n",
    "\n",
    "Our application generates a compelling product description and then runs three checks in parallel:\n",
    "\n",
    "- Safety Check: Is the content safe and non-violent?\n",
    "\n",
    "- Policy Compliance: Does it follow company policy?\n",
    "\n",
    "- Clarity Check: Is it understandable to a general audience?\n",
    "\n",
    "This pattern demonstrates how to fan out execution after a shared generation step, and aggregate results before producing a final output.\n",
    "\n",
    "We use Phoenix tracing to gain full visibility into each node execution, making it easy to debug or audit how decisions were made across the parallel branches."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install langgraph langchain langchain_community \"arize-phoenix\" arize-phoenix-otel openinference-instrumentation-langchain"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from getpass import getpass\n",
    "\n",
    "from langgraph.graph import END, START, StateGraph"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ[\"OPENAI_API_KEY\"] = getpass(\"🔑 Enter your OpenAI API key: \")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "lzR6b-d7kgzF"
   },
   "source": [
    "# Configure Phoenix Tracing\n",
    "\n",
    "Make sure you go to https://app.phoenix.arize.com/ and generate an API key. This will allow you to trace your Langgraph application with Phoenix."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"PHOENIX_API_KEY\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n",
    "\n",
    "if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.otel import register\n",
    "\n",
    "tracer_provider = register(project_name=\"Parallel\", auto_instrument=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.chat_models import ChatOpenAI\n",
    "from typing_extensions import TypedDict"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "p-3h3a5ikiou"
   },
   "source": [
    "# LLM of choice"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0.3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "Z3yGNuqrk3Tp"
   },
   "source": [
    "# Graph State Definition\n",
    "We define a State object to keep track of all data flowing through our LangGraph. This includes the input product name, the generated description, results of three independent evaluation checks, and the final aggregated output."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class State(TypedDict):\n",
    "    product: str\n",
    "    description: str\n",
    "    safety_check: str\n",
    "    policy_check: str\n",
    "    clarity_check: str\n",
    "    final_output: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "TTOgjtPik8XI"
   },
   "source": [
    "# Node 1: Generate Product Description\n",
    "This node uses the LLM to write a compelling marketing-style description of the product. The output is stored in the description field of the graph state."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_description(state: State):\n",
    "    msg = llm.invoke(f\"Write a compelling product description for: {state['product']}\")\n",
    "    return {\"description\": msg.content}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "Q6ZH2SovlF4v"
   },
   "source": [
    "# Node 2–4: Parallel Evaluation Checks\n",
    "After the product description is created, we fan out to three evaluators, each performing an independent check in parallel:\n",
    "\n",
    "**Safety Check**: Is the language safe and non-violent?\n",
    "\n",
    "**Policy Compliance**: Does it align with company guidelines?\n",
    "\n",
    "**Clarity Check**: Is it understandable by a general audience?\n",
    "\n",
    "Each function receives the same description as input and returns a binary decision (\"yes\" or \"no\")."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "company_policy = \"\"\"Company Product Description Policy\n",
    "1. Tone and Language\n",
    "Product descriptions must:\n",
    "\n",
    "Use clear, concise, and professional language.\n",
    "\n",
    "Maintain a friendly, helpful, and inclusive tone.\n",
    "\n",
    "Avoid slang, profanity, sarcasm, or overly casual phrasing.\n",
    "\n",
    "Be free from any offensive, discriminatory, or culturally insensitive terms.\n",
    "\n",
    "2. Truthfulness and Accuracy\n",
    "All product features, specifications, and benefits must be factually accurate.\n",
    "\n",
    "Claims (e.g. “fastest,” “best in class”) must be verifiable or supported by evidence (e.g., awards, benchmarks).\n",
    "\n",
    "Avoid:\n",
    "\n",
    "Misleading exaggerations.\n",
    "\n",
    "Unsubstantiated health or performance claims.\n",
    "\n",
    "Use of “guarantee” or “risk-free” unless legally backed.\n",
    "\n",
    "3. Compliance with Legal and Regulatory Guidelines\n",
    "Descriptions must not:\n",
    "\n",
    "Promise results (e.g., “will cure,” “guarantees success”) unless FDA/FTC-compliant.\n",
    "\n",
    "Use restricted terms in regulated industries (e.g., “organic,” “non-GMO,” “medical-grade”) without certification.\n",
    "\n",
    "Include comparative language against other brands unless objective and non-disparaging.\n",
    "\n",
    "4. Brand Voice and Alignment\n",
    "Product language must align with brand values:\n",
    "\n",
    "Empowerment\n",
    "\n",
    "Sustainability\n",
    "\n",
    "Innovation\n",
    "\n",
    "Trust\n",
    "\n",
    "Always mention unique selling points (USPs) when applicable, including differentiators that reflect the company’s mission.\n",
    "\n",
    "5. Inclusivity and Accessibility\n",
    "Avoid gendered, ageist, or culturally exclusive phrases.\n",
    "\n",
    "Use plain language that is accessible to readers at or below a 10th-grade level.\n",
    "\n",
    "Avoid niche references, idioms, or regionally specific expressions unless necessary.\n",
    "\n",
    "6. Formatting and Style\n",
    "Use proper grammar, punctuation, and spelling.\n",
    "\n",
    "Headings and bullet points should:\n",
    "\n",
    "Be consistently formatted.\n",
    "\n",
    "Begin with verbs when listing features (e.g., “Enhances clarity,” “Syncs automatically”).\n",
    "\n",
    "7. Restricted Content\n",
    "Product descriptions must not include:\n",
    "\n",
    "Violence, weapons, or militaristic comparisons.\n",
    "\n",
    "Sexually explicit or suggestive material.\n",
    "\n",
    "Mentions of illegal activities or substances.\n",
    "\n",
    "Political or religious endorsements.\n",
    "\n",
    "8. SEO and Discoverability (Optional)\n",
    "If optimizing for search:\n",
    "\n",
    "Use approved keywords (maintain keyword density <2.5%).\n",
    "\n",
    "Do not “stuff” keywords unnaturally or compromise readability.\n",
    "\n",
    "Include relevant tags in a way that fits the context.\n",
    "\n",
    "9. Call to Action (CTA) Guidelines\n",
    "CTAs should:\n",
    "\n",
    "Be clear and action-oriented (e.g., “Shop Now,” “Experience the difference”).\n",
    "\n",
    "Avoid pushy language (e.g., “Act before it’s gone!”).\n",
    "\n",
    "Align with the tone: confident but not aggressive.\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def check_safety(state: State):\n",
    "    msg = llm.invoke(\n",
    "        f\"Is this product description safe and non-violent? Answer yes or no.\\n\\n{state['description']}\"\n",
    "    )\n",
    "    return {\"safety_check\": msg.content}\n",
    "\n",
    "\n",
    "def check_policy(state: State):\n",
    "    msg = llm.invoke(\n",
    "        f\"Does this product description comply with our company policy? Here is the company policy: {company_policy}. Answer yes or no.\\n\\n{state['description']}\"\n",
    "    )\n",
    "    return {\"policy_check\": msg.content}\n",
    "\n",
    "\n",
    "def check_clarity(state: State):\n",
    "    msg = llm.invoke(\n",
    "        f\"Is this description clear and understandable to a 10th-grade reader? Answer yes or no.\\n\\n{state['description']}\"\n",
    "    )\n",
    "    return {\"clarity_check\": msg.content}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "--BmhL3tlMMU"
   },
   "source": [
    "# Node 5: Aggregate the Results\n",
    "Once the checks complete, this node gathers their responses. If all checks return \"yes\", the product description is approved. Otherwise, it’s flagged as rejected, along with reasons."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def aggregate_results(state: State):\n",
    "    if (\n",
    "        \"yes\" in state[\"safety_check\"].strip().lower()\n",
    "        and \"yes\" in state[\"policy_check\"].strip().lower()\n",
    "        and \"yes\" in state[\"clarity_check\"].strip().lower()\n",
    "    ):\n",
    "        return {\"final_output\": state[\"description\"]}\n",
    "    return {\n",
    "        \"final_output\": \"REJECTED: One or more checks failed.\\n\"\n",
    "        f\"Safety: {state['safety_check']}, Policy: {state['policy_check']}, Clarity: {state['clarity_check']}\"\n",
    "    }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "pm-KYiZzlRxE"
   },
   "source": [
    "# Building the Parallel Evaluation Graph\n",
    "With all our nodes defined, we now assemble them into a LangGraph using StateGraph.\n",
    "\n",
    "**Start → Description**: We begin by generating the product description.\n",
    "\n",
    "**Fan Out Checks**: The output fans out into three parallel paths — safety, policy, and clarity checks — enabling efficient, simultaneous validation.\n",
    "\n",
    "**Converge → Aggregate**: Once all checks complete, the results converge into a final aggregation node that determines whether to approve or reject the description.\n",
    "\n",
    "**End**: The final result is produced.\n",
    "\n",
    "This setup showcases LangGraph’s ability to manage parallelism and convergence, streamlining complex workflows while remaining transparent and modular."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "builder = StateGraph(State)\n",
    "\n",
    "builder.add_node(\"generate_description\", generate_description)\n",
    "builder.add_node(\"check_safety\", check_safety)\n",
    "builder.add_node(\"check_policy\", check_policy)\n",
    "builder.add_node(\"check_clarity\", check_clarity)\n",
    "builder.add_node(\"aggregate_results\", aggregate_results)\n",
    "\n",
    "# Description generation first\n",
    "builder.add_edge(START, \"generate_description\")\n",
    "\n",
    "# Then fan out for parallel checks\n",
    "builder.add_edge(\"generate_description\", \"check_safety\")\n",
    "builder.add_edge(\"generate_description\", \"check_policy\")\n",
    "builder.add_edge(\"generate_description\", \"check_clarity\")\n",
    "\n",
    "# All checks go to the aggregator\n",
    "builder.add_edge(\"check_safety\", \"aggregate_results\")\n",
    "builder.add_edge(\"check_policy\", \"aggregate_results\")\n",
    "builder.add_edge(\"check_clarity\", \"aggregate_results\")\n",
    "\n",
    "# Final result\n",
    "builder.add_edge(\"aggregate_results\", END)\n",
    "\n",
    "workflow = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "m1-XyQI0lZDD"
   },
   "source": [
    "# Example Usage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "state = workflow.invoke({\"product\": \"Smart glasses that project your calendar\"})\n",
    "print(state[\"final_output\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "state = workflow.invoke(\n",
    "    {\"product\": \"Headphones with noise cancellation, transparency, and other advanced features.\"}\n",
    ")\n",
    "print(state[\"final_output\"])\n",
    "state = workflow.invoke({\"product\": \"Smart fridge with advanced features.\"})\n",
    "print(state[\"final_output\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "UQJDA2SNlbIn"
   },
   "source": [
    "# Make sure to view your traces in Phoenix!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "IWPeOpzwkd56"
   },
   "source": [
    "# Let's add some Evaluations (Evals)\n",
    "\n",
    "In this section we will evaluate the accuracy of our safety, policy, and clarity checkers with another LLM call."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import phoenix as px\n",
    "\n",
    "df = px.Client().get_spans_dataframe(\"name == 'LangGraph'\", project_name=\"Parallel\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.to_csv(\"parallel_evals.csv\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "AdqvqRgcaZkU"
   },
   "source": [
    "# Custom Eval Template\n",
    "\n",
    "Here we define a custom eval template, designed to evaluate the policy, clarity, and safety checkers' decisions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TEMPLATE = \"\"\" You must decide whether the clarity, policy, and safety checkers made the right decision based on the generated descriptions.\n",
    "Check if the policy checker's decision correctly reflects whether the generatead description complies with the policy: {company_policy}.\n",
    "Check if the clarity checker's decision correctly reflects whether the generated description is understandable to a 10th-grade reader.\n",
    "Check if the safety checker's decision correctly reflects whether the generated description is safe and non-violent.\n",
    "\n",
    "generated description: {description}\n",
    "decisions: {decisions}\n",
    "\n",
    "Output 1/3 if one decision is correct, 2/3 if two decisions are correct, 3/3 if all decisions are correct, and 0/3 if all decisions are incorrect.\n",
    "Explan your reasoning.\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "wrYc42ebaiZ3"
   },
   "source": [
    "# Generate Evals"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "import pandas as pd\n",
    "\n",
    "from phoenix.evals import OpenAIModel, llm_classify\n",
    "\n",
    "\n",
    "def unpack(row):\n",
    "    blob = json.loads(row[\"attributes.output.value\"])\n",
    "    # pull the free-text description\n",
    "    description = blob.get(\"description\", \"\")\n",
    "    # collapse the three yes/no flags into one readable string\n",
    "    decisions_dict = {\n",
    "        \"policy_checker\": blob.get(\"policy_check\", \"No.\"),\n",
    "        \"clarity_checker\": blob.get(\"clarity_check\", \"No.\"),\n",
    "        \"safety_checker\": blob.get(\"safety_check\", \"No.\"),\n",
    "    }\n",
    "    # join into the form   \"policy_checker: Yes., clarity_checker: No.,  ...\"\n",
    "    decisions = \", \".join(f\"{k}: {v}\" for k, v in decisions_dict.items())\n",
    "    return pd.Series({\"description\": description, \"decisions\": decisions})\n",
    "\n",
    "\n",
    "df[[\"description\", \"decisions\"]] = df.apply(unpack, axis=1)\n",
    "\n",
    "# 3. make sure every {placeholder} in your TEMPLATE exists as a column --------\n",
    "df[\"company_policy\"] = company_policy  # now {company_policy} will resolve\n",
    "\n",
    "# 4. run the eval --------------------------------------------------------------\n",
    "\n",
    "# We treat 0/3 … 3/3 as four categorical classes\n",
    "rails = [\"0/3\", \"1/3\", \"2/3\", \"3/3\"]\n",
    "\n",
    "eval_results = llm_classify(\n",
    "    dataframe=df,\n",
    "    template=TEMPLATE,\n",
    "    model=OpenAIModel(model=\"gpt-4o\"),  # or any supported model\n",
    "    rails=rails,\n",
    "    include_prompt=True,\n",
    "    include_response=True,\n",
    "    verbose=True,\n",
    "    provide_explanation=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "eval_results.drop(\n",
    "    columns=[\"prompt\", \"exceptions\", \"execution_status\", \"execution_seconds\", \"response\"],\n",
    "    inplace=True,\n",
    ")\n",
    "eval_results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "L3fH44adantX"
   },
   "source": [
    "# Export Evals to Phoenix!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.client import AsyncClient\n",
    "\n",
    "px_client = AsyncClient()\n",
    "await px_client.spans.log_span_annotations_dataframe(\n",
    "    dataframe=eval_results,\n",
    "    annotation_name=\"Checker Accuracy\",\n",
    "    annotator_kind=\"LLM\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
